/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.hydraql.benchmark.core.workloads;

import com.hydraql.benchmark.core.ByteIterator;
import com.hydraql.benchmark.core.Client;
import com.hydraql.benchmark.core.DB;
import com.hydraql.benchmark.core.NumericByteIterator;
import com.hydraql.benchmark.core.Status;
import com.hydraql.benchmark.core.StringByteIterator;
import com.hydraql.benchmark.core.Utils;
import com.hydraql.benchmark.core.Workload;
import com.hydraql.benchmark.core.WorkloadException;
import com.hydraql.benchmark.core.generator.DiscreteGenerator;
import com.hydraql.benchmark.core.generator.Generator;
import com.hydraql.benchmark.core.generator.HotspotIntegerGenerator;
import com.hydraql.benchmark.core.generator.IncrementingPrintableStringGenerator;
import com.hydraql.benchmark.core.generator.NumberGenerator;
import com.hydraql.benchmark.core.generator.RandomDiscreteTimestampGenerator;
import com.hydraql.benchmark.core.generator.ScrambledZipfianGenerator;
import com.hydraql.benchmark.core.generator.SequentialGenerator;
import com.hydraql.benchmark.core.generator.UniformLongGenerator;
import com.hydraql.benchmark.core.generator.UnixEpochTimestampGenerator;
import com.hydraql.benchmark.core.generator.ZipfianGenerator;
import com.hydraql.benchmark.core.measurements.Measurements;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
 * A specialized workload dealing with time series data, i.e. series of discreet events associated
 * with timestamps and identifiers. For this workload, identities consist of a {@link String}
 * <b>key</b> and a set of {@link String} <b>tag key/value</b> pairs.
 * <p>
 * For example:
 * <table border="1">
 * <tr>
 * <th>Time Series Key</th>
 * <th>Tag Keys/Values</th>
 * <th>1483228800</th>
 * <th>1483228860</th>
 * <th>1483228920</th>
 * </tr>
 * <tr>
 * <td>AA</td>
 * <td>AA=AA, AB=AA</td>
 * <td>42.5</td>
 * <td>1.0</td>
 * <td>85.9</td>
 * </tr>
 * <tr>
 * <td>AA</td>
 * <td>AA=AA, AB=AB</td>
 * <td>-9.4</td>
 * <td>76.9</td>
 * <td>0.18</td>
 * </tr>
 * <tr>
 * <td>AB</td>
 * <td>AA=AA, AB=AA</td>
 * <td>-93.0</td>
 * <td>57.1</td>
 * <td>-63.8</td>
 * </tr>
 * <tr>
 * <td>AB</td>
 * <td>AA=AA, AB=AB</td>
 * <td>7.6</td>
 * <td>56.1</td>
 * <td>-0.3</td>
 * </tr>
 * </table>
 * <p>
 * This table shows four time series with 3 measurements at three different timestamps. Keys, tags,
 * timestamps and values (numeric only at this time) are generated by this workload. For details on
 * properties and behavior, see the {@code workloads/tsworkload_template} file. The Javadocs will
 * focus on implementation and how {@link DB} clients can parse the workload.
 * <p>
 * In order to avoid having existing DB implementations implement a brand new interface this
 * workload uses the existing APIs to encode a few special values that can be parsed by the client.
 * The special values include the timestamp, numeric value and some query (read or scan) parameters.
 * As an example on how to parse the fields, see {@link BasicTSDB}.
 * <p>
 * <b>Timestamps</b>
 * <p>
 * Timestamps are presented as Unix Epoch values in units of {@link TimeUnit#SECONDS},
 * {@link TimeUnit#MILLISECONDS} or {@link TimeUnit#NANOSECONDS} based on the {@code timestampunits}
 * property. For calls to {@link DB#insert(String, String, Map)} and
 * {@link DB#update(String, String, Map)}, the timestamp is added to the {@code values} map encoded
 * in a {@link NumericByteIterator} with the key defined in the {@code timestampkey} property
 * (defaulting to "YCSBTS"). To pull out the timestamp when iterating over the values map, cast the
 * {@link ByteIterator} to a {@link NumericByteIterator} and call
 * {@link NumericByteIterator#getLong()}.
 * <p>
 * Note that for calls to {@link DB#update(String, String, Map)}, timestamps earlier than the
 * timestamp generator's timestamp will be choosen at random to mimic a lambda architecture or old
 * job re-reporting some data.
 * <p>
 * For calls to {@link DB#read(String, String, Set, Map)} and
 * {@link DB#scan(String, String, int, Set, Vector)}, timestamps are encoded in a
 * {@link StringByteIterator} in a key/value format with the {@code tagpairdelimiter} separator. E.g
 * {@code YCSBTS=1483228800}. If {@code querytimespan} has been set to a positive value then the
 * value will include a range with the starting (oldest) timestamp followed by the
 * {@code querytimespandelimiter} separator and the ending (most recent) timestamp. E.g.
 * {@code YCSBTS=1483228800-1483228920}.
 * <p>
 * For calls to {@link DB#delete(String, String)}, encoding is the same as reads and scans but
 * key/value pairs are separated by the {@code deletedelimiter} property value.
 * <p>
 * By default, the starting timestamp is the current system time without any rounding. All
 * timestamps are then offsets from that starting value.
 * <p>
 * <b>Values</b>
 * <p>
 * Similar to timestamps, values are encoded in {@link NumericByteIterator}s and stored in the
 * values map with the key defined in {@code valuekey} (defaulting to "YCSBV"). Values can either be
 * 64 bit signed {@link long}s or double precision {@link double}s depending on the
 * {@code valuetype} or {@code dataintegrity} properties. When parsing out the value, always call
 * {@link NumericByteIterator#isFloatingPoint()} to determine whether or not to call
 * {@link NumericByteIterator#getDouble()} (true) or {@link NumericByteIterator#getLong()} (false).
 * <p>
 * When {@code dataintegrity} is set to true, then the value is always set to a 64 bit signed
 * integer which is the Java hash code of the concatenation of the key and map of values (sorted on
 * the map keys and skipping the timestamp and value entries) OR'd with the timestamp of the data
 * point. See {@link #validationFunction(String, long, TreeMap)} for the implementation.
 * <p>
 * <b>Keys and Tags</b>
 * <p>
 * As mentioned, the workload generates strings for the keys and tags. On initialization three
 * string generators are created using the {@link IncrementingPrintableStringGenerator}
 * implementation. Then the generators fill three arrays with values based on the number of keys,
 * the number of tags and the cardinality of each tag key/value pair. This implementation gives us
 * time series like the example table where every string starts at something like "AA" (depending on
 * the length of keys, tag keys and tag values) and continuing to "ZZ" wherein they rollover back to
 * "AA".
 * <p>
 * Each time series must have a unique set of tag keys, i.e. the key "AA" cannot appear more than
 * once per time series. If the workload is configured for four tags with a tag key length of 2, the
 * keys would be "AA", "AB", "AC" and "AD".
 * <p>
 * Each tag key is then associated with a tag value. Tag values may appear more than once in each
 * time series. E.g. time series will usually start with the tags "AA=AA", "AB=AA", "AC=AA" and
 * "AD=AA". The {@code tagcardinality} property determines how many unique values will be generated
 * per tag key. In the example table above, the {@code tagcardinality} property would have been set
 * to {@code 1,2} meaning tag key "AA" would always have the tag value "AA" given a cardinality of
 * 1. However tag key "AB" would have values "AA" and "AB" due to a cardinality of 2. This
 * cardinality map, along with the number of unique time series keys determines how many unique time
 * series are generated for the workload. Tag values share a common array of generated strings to
 * save on memory.
 * <p>
 * <b>Operation Order</b>
 * <p>
 * The default behavior of the workload (for inserts and updates) is to generate a value for each
 * time series for a given timestamp before incrementing to the next timestamp and writing values.
 * This is an ideal workload and some time series databases are designed for this behavior. However
 * in the real-world events will arrive grouped close to the current system time with a number of
 * events being delayed, hence their timestamps are further in the past. The {@code delayedseries}
 * property determines the percentage of time series that are delayed by up to
 * {@code delayedintervals} intervals. E.g. setting this value to 0.05 means that 5% of the time
 * series will be written with timestamps earlier than the timestamp generator's current time.
 * </p>
 * <b>Reads and Scans</b>
 * <p>
 * For benchmarking queries, some common tasks implemented by almost every time series data base are
 * available and are passed in the fields {@link Set}:
 * <p>
 * <b>GroupBy</b> - A common operation is to aggregate multiple time series into a single time
 * series via common parameters. For example, a user may want to see the total network traffic in a
 * data center so they'll issue a SQL query like:
 * <code>SELECT value FROM timeseriesdb GROUP BY datacenter ORDER BY SUM(value);</code> If the
 * {@code groupbyfunction} has been set to a group by function, then the fields will contain a
 * key/value pair with the key set in {@code groupbykey}. E.g. {@code YCSBGB=SUM}.
 * <p>
 * Additionally with grouping enabled, fields on tag keys where group bys should occur will only
 * have the key defined and will not have a value or delimiter. E.g. if grouping on tag key "AA",
 * the field will contain {@code AA} instead of {@code AA=AB}.
 * <p>
 * <b>Downsampling</b> - Another common operation is to reduce the resolution of the queried time
 * series when fetching a wide time range of data so fewer data points are returned. For example, a
 * user may fetch a week of data but if the data is recorded on a 1 second interval, that would be
 * over 600k data points so they may ask for a 1 hour downsampling (also called bucketing) wherein
 * every hour, all of the data points for a "bucket" are aggregated into a single value.
 * <p>
 * To enable downsampling, the {@code downsamplingfunction} property must be set to a supported
 * function such as "SUM" and the {@code downsamplinginterval} must be set to a valid time interval
 * with the same units as {@code timestampunits}, e.g. "3600" which would create 1 hour buckets if
 * the time units were set to seconds. With downsampling, query fields will include a key/value pair
 * with {@code downsamplingkey} as the key (defaulting to "YCSBDS") and the value being a
 * concatenation of {@code downsamplingfunction} and {@code downsamplinginterval}, for example
 * {@code YCSBDS=SUM60}.
 * <p>
 * <b>Timestamps</b> - For every read, a random timestamp is selected from the interval set. If
 * {@code querytimespan} has been set to a positive value, then the configured query time interval
 * is added to the selected timestamp so the read passes the DB a range of times. Note that during
 * the run phase, if no data was previously loaded, or if there are more {@code recordcount}s set
 * for the run phase, reads may be sent to the DB with timestamps that are beyond the written data
 * time range (or even the system clock of the DB).
 * <p>
 * <b>Deletes</b>
 * <p>
 * Because the delete API only accepts a single key, a full key and tag key/value pair map is
 * flattened into a single string for parsing by the database. Common workloads include deleting a
 * single time series (wherein all tag key and values are defined), deleting all series containing a
 * tag key and value or deleting all of the time series sharing a common time series key.
 * <p>
 * Right now the workload supports deletes with a key and for time series tag key/value pairs or a
 * key with tags and a group by on one or more tags (meaning, delete all of the series with any
 * value for the given tag key). The parameters are collapsed into a single string delimited with
 * the character in the {@code deletedelimiter} property. For example, a delete request may look
 * like: {@code AA:AA=AA:AA=AB} to delete the first time series in the table above.
 * <p>
 * <b>Threads</b>
 * <p>
 * For a multi-threaded execution, the number of time series keys set via the {@code fieldcount}
 * property, must be greater than or equal to the number of threads set via {@code threads}. This is
 * due to each thread choosing a subset of the total number of time series keys and being
 * responsible for writing values for each time series containing those keys at each timestamp. Thus
 * each thread will have it's own timestamp generator, incrementing each time every time series it
 * is responsible for has had a value written.
 * <p>
 * Each thread may, however, issue reads and scans for any time series in the complete set.
 * <p>
 * <b>Sparsity</b>
 * <p>
 * By default, during loads, every time series will have a data point written at every time stamp in
 * the interval set. This is common in workloads where a sensor writes a value at regular intervals.
 * However some time series are only reported under certain conditions.
 * <p>
 * For example, a counter may track the number of errors over a time period for a web service and
 * only report when the value is greater than 1. Or a time series may include tags such as a user ID
 * and IP address when a request arrives at the web service and only report values when that
 * combination is seen. This means the timeseries will <i>not</i> have a value at every timestamp
 * and in some cases there may be only a single value!
 * <p>
 * This workload has a {@code sparsity} parameter that can choose how often a time series should
 * record a value. The default value of 0.0 means every series will get a value at every timestamp.
 * A value of 0.95 will mean that for each series, only 5% of the timestamps in the interval will
 * have a value. The distribution of values is random.
 * <p>
 * <b>Notes/Warnings</b>
 * <p>
 * <ul>
 * <li>Because time series keys and tag key/values are generated and stored in memory, be careful of
 * setting the cardinality too high for the JVM's heap.</li>
 * <li>When running for data integrity, a number of settings are incompatible and will throw errors.
 * Check the error messages for details.</li>
 * <li>Databases that support keys only and can't store tags should order and then collapse the tag
 * values using a delimiter. For example the series in the example table at the top could be written
 * as:
 * <ul>
 * <li>{@code AA.AA.AA}</li>
 * <li>{@code AA.AA.AB}</li>
 * <li>{@code AB.AA.AA}</li>
 * <li>{@code AB.AA.AB}</li>
 * </ul>
 * </li>
 * </ul>
 * <p>
 * <b>TODOs</b>
 * <p>
 * <ul>
 * <li>Support random time intervals. E.g. some series write every second, others every 60
 * seconds.</li>
 * <li>Support random time series cardinality. Right now every series has the same cardinality.</li>
 * <li>Truly random timetamps per time series. We could use bitmaps to determine if a series has had
 * a value written for a given timestamp. Right now all of the series are in sync time-wise.</li>
 * <li>Possibly a real-time load where values are written with the current system time. It's more of
 * a bulk-loading operation now.</li>
 * </ul>
 */
public class TimeSeriesWorkload extends Workload {

  /**
   * The types of values written to the timeseries store.
   */
  public enum ValueType {
    INTEGERS("integers"), FLOATS("floats"), MIXED("mixednumbers");

    protected final String name;

    ValueType(final String name) {
      this.name = name;
    }

    public static ValueType fromString(final String name) {
      for (final ValueType type : ValueType.values()) {
        if (type.name.equalsIgnoreCase(name)) {
          return type;
        }
      }
      throw new IllegalArgumentException("Unrecognized type: " + name);
    }
  }

  /** Name and default value for the timestamp key property. */
  public static final String TIMESTAMP_KEY_PROPERTY = "timestampkey";
  public static final String TIMESTAMP_KEY_PROPERTY_DEFAULT = "YCSBTS";

  /** Name and default value for the value key property. */
  public static final String VALUE_KEY_PROPERTY = "valuekey";
  public static final String VALUE_KEY_PROPERTY_DEFAULT = "YCSBV";

  /** Name and default value for the timestamp interval property. */
  public static final String TIMESTAMP_INTERVAL_PROPERTY = "timestampinterval";
  public static final String TIMESTAMP_INTERVAL_PROPERTY_DEFAULT = "60";

  /** Name and default value for the timestamp units property. */
  public static final String TIMESTAMP_UNITS_PROPERTY = "timestampunits";
  public static final String TIMESTAMP_UNITS_PROPERTY_DEFAULT = "SECONDS";

  /** Name and default value for the number of tags property. */
  public static final String TAG_COUNT_PROPERTY = "tagcount";
  public static final String TAG_COUNT_PROPERTY_DEFAULT = "4";

  /** Name and default value for the tag value cardinality map property. */
  public static final String TAG_CARDINALITY_PROPERTY = "tagcardinality";
  public static final String TAG_CARDINALITY_PROPERTY_DEFAULT = "1, 2, 4, 8";

  /** Name and default value for the tag key length property. */
  public static final String TAG_KEY_LENGTH_PROPERTY = "tagkeylength";
  public static final String TAG_KEY_LENGTH_PROPERTY_DEFAULT = "8";

  /** Name and default value for the tag value length property. */
  public static final String TAG_VALUE_LENGTH_PROPERTY = "tagvaluelength";
  public static final String TAG_VALUE_LENGTH_PROPERTY_DEFAULT = "8";

  /** Name and default value for the tag pair delimiter property. */
  public static final String PAIR_DELIMITER_PROPERTY = "tagpairdelimiter";
  public static final String PAIR_DELIMITER_PROPERTY_DEFAULT = "=";

  /** Name and default value for the delete string delimiter property. */
  public static final String DELETE_DELIMITER_PROPERTY = "deletedelimiter";
  public static final String DELETE_DELIMITER_PROPERTY_DEFAULT = ":";

  /** Name and default value for the random timestamp write order property. */
  public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY = "randomwritetimestamporder";
  public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT = "false";

  /** Name and default value for the random time series write order property. */
  public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY = "randomtimeseriesorder";
  public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT = "true";

  /** Name and default value for the value types property. */
  public static final String VALUE_TYPE_PROPERTY = "valuetype";
  public static final String VALUE_TYPE_PROPERTY_DEFAULT = "floats";

  /** Name and default value for the sparsity property. */
  public static final String SPARSITY_PROPERTY = "sparsity";
  public static final String SPARSITY_PROPERTY_DEFAULT = "0.00";

  /** Name and default value for the delayed series percentage property. */
  public static final String DELAYED_SERIES_PROPERTY = "delayedseries";
  public static final String DELAYED_SERIES_PROPERTY_DEFAULT = "0.10";

  /** Name and default value for the delayed series intervals property. */
  public static final String DELAYED_INTERVALS_PROPERTY = "delayedintervals";
  public static final String DELAYED_INTERVALS_PROPERTY_DEFAULT = "5";

  /** Name and default value for the query time span property. */
  public static final String QUERY_TIMESPAN_PROPERTY = "querytimespan";
  public static final String QUERY_TIMESPAN_PROPERTY_DEFAULT = "0";

  /** Name and default value for the randomized query time span property. */
  public static final String QUERY_RANDOM_TIMESPAN_PROPERTY = "queryrandomtimespan";
  public static final String QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT = "false";

  /** Name and default value for the query time stamp delimiter property. */
  public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY = "querytimespandelimiter";
  public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT = ",";

  /** Name and default value for the group-by key property. */
  public static final String GROUPBY_KEY_PROPERTY = "groupbykey";
  public static final String GROUPBY_KEY_PROPERTY_DEFAULT = "YCSBGB";

  /** Name and default value for the group-by function property. */
  public static final String GROUPBY_PROPERTY = "groupbyfunction";

  /** Name and default value for the group-by key map property. */
  public static final String GROUPBY_KEYS_PROPERTY = "groupbykeys";

  /** Name and default value for the downsampling key property. */
  public static final String DOWNSAMPLING_KEY_PROPERTY = "downsamplingkey";
  public static final String DOWNSAMPLING_KEY_PROPERTY_DEFAULT = "YCSBDS";

  /** Name and default value for the downsampling function property. */
  public static final String DOWNSAMPLING_FUNCTION_PROPERTY = "downsamplingfunction";

  /** Name and default value for the downsampling interval property. */
  public static final String DOWNSAMPLING_INTERVAL_PROPERTY = "downsamplinginterval";

  /** The properties to pull settings from. */
  protected Properties properties;

  /** Generators for keys, tag keys and tag values. */
  protected Generator<String> keyGenerator;
  protected Generator<String> tagKeyGenerator;
  protected Generator<String> tagValueGenerator;

  /** The timestamp key, defaults to "YCSBTS". */
  protected String timestampKey;

  /** The value key, defaults to "YCSBDS". */
  protected String valueKey;

  /** The number of time units in between timestamps. */
  protected int timestampInterval;

  /** The units of time the timestamp and various intervals represent. */
  protected TimeUnit timeUnits;

  /** Whether or not to randomize the timestamp order when writing. */
  protected boolean randomizeTimestampOrder;

  /**
   * Whether or not to randomize (shuffle) the time series order. NOT compatible with data
   * integrity.
   */
  protected boolean randomizeTimeseriesOrder;

  /** The type of values to generate when writing data. */
  protected ValueType valueType;

  /** Used to calculate an offset for each time series. */
  protected int[] cumulativeCardinality;

  /** The calculated total cardinality based on the config. */
  protected int totalCardinality;

  /**
   * The calculated per-time-series-key cardinality. I.e. the number of unique tag key and value
   * combinations.
   */
  protected int perKeyCardinality;

  /** How much data to scan for in each call. */
  protected NumberGenerator scanlength;

  /** A generator used to select a random time series key per read/scan. */
  protected NumberGenerator keychooser;

  /** A generator to select what operation to perform during the run phase. */
  protected DiscreteGenerator operationchooser;

  /**
   * The maximum number of interval offsets from the starting timestamp. Calculated based on the
   * number of records configured for the run.
   */
  protected int maxOffsets;

  /** The number of records or operations to perform for this run. */
  protected int recordcount;

  /** The number of tag pairs per time series. */
  protected int tagPairs;

  /** The table we'll write to. */
  protected String table;

  /** How many time series keys will be generated. */
  protected int numKeys;

  /** The generated list of possible time series key values. */
  protected String[] keys;

  /** The generated list of possible tag key values. */
  protected String[] tagKeys;

  /** The generated list of possible tag value values. */
  protected String[] tagValues;

  /** The cardinality for each tag key. */
  protected int[] tagCardinality;

  /** A helper to skip non-incrementing tag values. */
  protected int firstIncrementableCardinality;

  /** How sparse the data written should be. */
  protected double sparsity;

  /** The percentage of time series that should be delayed in writes. */
  protected double delayedSeries;

  /** The maximum number of intervals to delay a series. */
  protected int delayedIntervals;

  /** Optional query time interval during reads/scans. */
  protected int queryTimeSpan;

  /**
   * Whether or not the actual interval should be randomly chosen, using queryTimeSpan as the
   * maximum value.
   */
  protected boolean queryRandomTimeSpan;

  /** The delimiter for tag pairs in fields. */
  protected String tagPairDelimiter;

  /** The delimiter between parameters for the delete key. */
  protected String deleteDelimiter;

  /** The delimiter between timestamps for query time spans. */
  protected String queryTimeSpanDelimiter;

  /** Whether or not to issue group-by queries. */
  protected boolean groupBy;

  /** The key used for group-by tag keys. */
  protected String groupByKey;

  /** The function used for group-by's. */
  protected String groupByFunction;

  /** The tag keys to group on. */
  protected boolean[] groupBys;

  /** Whether or not to issue downsampling queries. */
  protected boolean downsample;

  /** The key used for downsampling tag keys. */
  protected String downsampleKey;

  /** The downsampling function. */
  protected String downsampleFunction;

  /** The downsampling interval. */
  protected int downsampleInterval;

  /**
   * Set to true if want to check correctness of reads. Must also be set to true during loading
   * phase to function.
   */
  protected boolean dataintegrity;

  /** Measurements to write data integrity results to. */
  protected Measurements measurements = Measurements.getMeasurements();

  @Override
  public void init(final Properties p) throws WorkloadException {
    properties = p;
    recordcount =
        Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT));
    if (recordcount == 0) {
      recordcount = Integer.MAX_VALUE;
    }
    timestampKey = p.getProperty(TIMESTAMP_KEY_PROPERTY, TIMESTAMP_KEY_PROPERTY_DEFAULT);
    valueKey = p.getProperty(VALUE_KEY_PROPERTY, VALUE_KEY_PROPERTY_DEFAULT);
    operationchooser = CoreWorkload.createOperationGenerator(properties);

    final int maxscanlength = Integer.parseInt(p.getProperty(CoreWorkload.MAX_SCAN_LENGTH_PROPERTY,
      CoreWorkload.MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
    String scanlengthdistrib = p.getProperty(CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY,
      CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);

    if (scanlengthdistrib.compareTo("uniform") == 0) {
      scanlength = new UniformLongGenerator(1, maxscanlength);
    } else if (scanlengthdistrib.compareTo("zipfian") == 0) {
      scanlength = new ZipfianGenerator(1, maxscanlength);
    } else {
      throw new WorkloadException(
          "Distribution \"" + scanlengthdistrib + "\" not allowed for scan length");
    }

    randomizeTimestampOrder = Boolean.parseBoolean(p.getProperty(RANDOMIZE_TIMESTAMP_ORDER_PROPERTY,
      RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT));
    randomizeTimeseriesOrder =
        Boolean.parseBoolean(p.getProperty(RANDOMIZE_TIMESERIES_ORDER_PROPERTY,
          RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT));

    // setup the cardinality
    numKeys = Integer.parseInt(
      p.getProperty(CoreWorkload.FIELD_COUNT_PROPERTY, CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
    tagPairs = Integer.parseInt(p.getProperty(TAG_COUNT_PROPERTY, TAG_COUNT_PROPERTY_DEFAULT));
    sparsity = Double.parseDouble(p.getProperty(SPARSITY_PROPERTY, SPARSITY_PROPERTY_DEFAULT));
    tagCardinality = new int[tagPairs];

    final String requestdistrib = p.getProperty(CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY,
      CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
    if (requestdistrib.compareTo("uniform") == 0) {
      keychooser = new UniformLongGenerator(0, numKeys - 1);
    } else if (requestdistrib.compareTo("sequential") == 0) {
      keychooser = new SequentialGenerator(0, numKeys - 1);
    } else if (requestdistrib.compareTo("zipfian") == 0) {
      keychooser = new ScrambledZipfianGenerator(0, numKeys - 1);
      // } else if (requestdistrib.compareTo("latest") == 0) {
      // keychooser = new SkewedLatestGenerator(transactioninsertkeysequence);
    } else if (requestdistrib.equals("hotspot")) {
      double hotsetfraction = Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_DATA_FRACTION,
        CoreWorkload.HOTSPOT_DATA_FRACTION_DEFAULT));
      double hotopnfraction = Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_OPN_FRACTION,
        CoreWorkload.HOTSPOT_OPN_FRACTION_DEFAULT));
      keychooser = new HotspotIntegerGenerator(0, numKeys - 1, hotsetfraction, hotopnfraction);
    } else {
      throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\"");
    }

    // figure out the start timestamp based on the units, cardinality and interval
    try {
      timestampInterval = Integer.parseInt(
        p.getProperty(TIMESTAMP_INTERVAL_PROPERTY, TIMESTAMP_INTERVAL_PROPERTY_DEFAULT));
    } catch (NumberFormatException nfe) {
      throw new WorkloadException("Unable to parse the " + TIMESTAMP_INTERVAL_PROPERTY, nfe);
    }

    try {
      timeUnits = TimeUnit.valueOf(
        p.getProperty(TIMESTAMP_UNITS_PROPERTY, TIMESTAMP_UNITS_PROPERTY_DEFAULT).toUpperCase());
    } catch (IllegalArgumentException e) {
      throw new WorkloadException("Unknown time unit type", e);
    }
    if (timeUnits == TimeUnit.NANOSECONDS || timeUnits == TimeUnit.MICROSECONDS) {
      throw new WorkloadException("YCSB doesn't support " + timeUnits + " at this time.");
    }

    tagPairDelimiter = p.getProperty(PAIR_DELIMITER_PROPERTY, PAIR_DELIMITER_PROPERTY_DEFAULT);
    deleteDelimiter = p.getProperty(DELETE_DELIMITER_PROPERTY, DELETE_DELIMITER_PROPERTY_DEFAULT);
    dataintegrity = Boolean.parseBoolean(p.getProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY,
      CoreWorkload.DATA_INTEGRITY_PROPERTY_DEFAULT));

    queryTimeSpan =
        Integer.parseInt(p.getProperty(QUERY_TIMESPAN_PROPERTY, QUERY_TIMESPAN_PROPERTY_DEFAULT));
    queryRandomTimeSpan = Boolean.parseBoolean(
      p.getProperty(QUERY_RANDOM_TIMESPAN_PROPERTY, QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT));
    queryTimeSpanDelimiter =
        p.getProperty(QUERY_TIMESPAN_DELIMITER_PROPERTY, QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT);

    groupByKey = p.getProperty(GROUPBY_KEY_PROPERTY, GROUPBY_KEY_PROPERTY_DEFAULT);
    groupByFunction = p.getProperty(GROUPBY_PROPERTY);
    if (groupByFunction != null && !groupByFunction.isEmpty()) {
      final String groupByKeys = p.getProperty(GROUPBY_KEYS_PROPERTY);
      if (groupByKeys == null || groupByKeys.isEmpty()) {
        throw new WorkloadException("Group by was enabled but no keys were specified.");
      }
      final String[] gbKeys = groupByKeys.split(",");
      if (gbKeys.length != tagKeys.length) {
        throw new WorkloadException("Only " + gbKeys.length + " group by keys "
            + "were specified but there were " + tagKeys.length + " tag keys given.");
      }
      groupBys = new boolean[gbKeys.length];
      for (int i = 0; i < gbKeys.length; i++) {
        groupBys[i] = Integer.parseInt(gbKeys[i].trim()) == 0 ? false : true;
      }
      groupBy = true;
    }

    downsampleKey = p.getProperty(DOWNSAMPLING_KEY_PROPERTY, DOWNSAMPLING_KEY_PROPERTY_DEFAULT);
    downsampleFunction = p.getProperty(DOWNSAMPLING_FUNCTION_PROPERTY);
    if (downsampleFunction != null && !downsampleFunction.isEmpty()) {
      final String interval = p.getProperty(DOWNSAMPLING_INTERVAL_PROPERTY);
      if (interval == null || interval.isEmpty()) {
        throw new WorkloadException("'" + DOWNSAMPLING_INTERVAL_PROPERTY + "' was missing despite '"
            + DOWNSAMPLING_FUNCTION_PROPERTY + "' being set.");
      }
      downsampleInterval = Integer.parseInt(interval);
      downsample = true;
    }

    delayedSeries =
        Double.parseDouble(p.getProperty(DELAYED_SERIES_PROPERTY, DELAYED_SERIES_PROPERTY_DEFAULT));
    delayedIntervals = Integer
        .parseInt(p.getProperty(DELAYED_INTERVALS_PROPERTY, DELAYED_INTERVALS_PROPERTY_DEFAULT));

    valueType =
        ValueType.fromString(p.getProperty(VALUE_TYPE_PROPERTY, VALUE_TYPE_PROPERTY_DEFAULT));
    table = p.getProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT);
    initKeysAndTags();
    validateSettings();
  }

  @Override
  public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException {
    if (properties == null) {
      throw new WorkloadException("Workload has not been initialized.");
    }
    return new ThreadState(mythreadid, threadcount);
  }

  @Override
  public boolean doInsert(DB db, Object threadstate) {
    if (threadstate == null) {
      throw new IllegalStateException("Missing thread state.");
    }
    final Map<String, ByteIterator> tags = new TreeMap<String, ByteIterator>();
    final String key = ((ThreadState) threadstate).nextDataPoint(tags, true);
    if (db.insert(table, key, tags) == Status.OK) {
      return true;
    }
    return false;
  }

  @Override
  public boolean doTransaction(DB db, Object threadstate) {
    if (threadstate == null) {
      throw new IllegalStateException("Missing thread state.");
    }
    switch (operationchooser.nextString()) {
      case "READ":
        doTransactionRead(db, threadstate);
        break;
      case "UPDATE":
        doTransactionUpdate(db, threadstate);
        break;
      case "INSERT":
        doTransactionInsert(db, threadstate);
        break;
      case "SCAN":
        doTransactionScan(db, threadstate);
        break;
      case "DELETE":
        doTransactionDelete(db, threadstate);
        break;
      default:
        return false;
    }
    return true;
  }

  protected void doTransactionRead(final DB db, Object threadstate) {
    final ThreadState state = (ThreadState) threadstate;
    final String keyname = keys[keychooser.nextValue().intValue()];
    final Random random = ThreadLocalRandom.current();
    int offsets = state.queryOffsetGenerator.nextValue().intValue();
    // int offsets = random.nextInt(maxOffsets - 1);
    final long startTimestamp;
    if (offsets > 0) {
      startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
    } else {
      startTimestamp = state.startTimestamp;
    }

    // rando tags
    Set<String> fields = new HashSet<String>();
    for (int i = 0; i < tagPairs; ++i) {
      if (groupBy && groupBys[i]) {
        fields.add(tagKeys[i]);
      } else {
        fields.add(tagKeys[i] + tagPairDelimiter + tagValues[random.nextInt(tagCardinality[i])]);
      }
    }

    if (queryTimeSpan > 0) {
      final long endTimestamp;
      if (queryRandomTimeSpan) {
        endTimestamp = startTimestamp
            + (timestampInterval * random.nextInt(queryTimeSpan / timestampInterval));
      } else {
        endTimestamp = startTimestamp + queryTimeSpan;
      }
      fields.add(
        timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
    } else {
      fields.add(timestampKey + tagPairDelimiter + startTimestamp);
    }
    if (groupBy) {
      fields.add(groupByKey + tagPairDelimiter + groupByFunction);
    }
    if (downsample) {
      fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + downsampleInterval);
    }

    final Map<String, ByteIterator> cells = new HashMap<String, ByteIterator>();
    final Status status = db.read(table, keyname, fields, cells);

    if (dataintegrity && status == Status.OK) {
      verifyRow(keyname, cells);
    }
  }

  protected void doTransactionUpdate(final DB db, Object threadstate) {
    if (threadstate == null) {
      throw new IllegalStateException("Missing thread state.");
    }
    final Map<String, ByteIterator> tags = new TreeMap<String, ByteIterator>();
    final String key = ((ThreadState) threadstate).nextDataPoint(tags, false);
    db.update(table, key, tags);
  }

  protected void doTransactionInsert(final DB db, Object threadstate) {
    doInsert(db, threadstate);
  }

  protected void doTransactionScan(final DB db, Object threadstate) {
    final ThreadState state = (ThreadState) threadstate;
    final Random random = ThreadLocalRandom.current();
    final String keyname = keys[random.nextInt(keys.length)];

    // choose a random scan length
    int len = scanlength.nextValue().intValue();

    int offsets = random.nextInt(maxOffsets - 1);
    final long startTimestamp;
    if (offsets > 0) {
      startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
    } else {
      startTimestamp = state.startTimestamp;
    }

    // rando tags
    Set<String> fields = new HashSet<String>();
    for (int i = 0; i < tagPairs; ++i) {
      if (groupBy && groupBys[i]) {
        fields.add(tagKeys[i]);
      } else {
        fields.add(tagKeys[i] + tagPairDelimiter + tagValues[random.nextInt(tagCardinality[i])]);
      }
    }

    if (queryTimeSpan > 0) {
      final long endTimestamp;
      if (queryRandomTimeSpan) {
        endTimestamp = startTimestamp
            + (timestampInterval * random.nextInt(queryTimeSpan / timestampInterval));
      } else {
        endTimestamp = startTimestamp + queryTimeSpan;
      }
      fields.add(
        timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
    } else {
      fields.add(timestampKey + tagPairDelimiter + startTimestamp);
    }
    if (groupBy) {
      fields.add(groupByKey + tagPairDelimiter + groupByFunction);
    }
    if (downsample) {
      fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + tagPairDelimiter
          + downsampleInterval);
    }

    final Vector<HashMap<String, ByteIterator>> results =
        new Vector<HashMap<String, ByteIterator>>();
    db.scan(table, keyname, len, fields, results);
  }

  protected void doTransactionDelete(final DB db, Object threadstate) {
    final ThreadState state = (ThreadState) threadstate;
    final Random random = ThreadLocalRandom.current();
    final StringBuilder buf = new StringBuilder().append(keys[random.nextInt(keys.length)]);

    int offsets = random.nextInt(maxOffsets - 1);
    final long startTimestamp;
    if (offsets > 0) {
      startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets);
    } else {
      startTimestamp = state.startTimestamp;
    }

    // rando tags
    for (int i = 0; i < tagPairs; ++i) {
      if (groupBy && groupBys[i]) {
        buf.append(deleteDelimiter).append(tagKeys[i]);
      } else {
        buf.append(deleteDelimiter)
            .append(tagKeys[i] + tagPairDelimiter + tagValues[random.nextInt(tagCardinality[i])]);
      }
    }

    if (queryTimeSpan > 0) {
      final long endTimestamp;
      if (queryRandomTimeSpan) {
        endTimestamp = startTimestamp
            + (timestampInterval * random.nextInt(queryTimeSpan / timestampInterval));
      } else {
        endTimestamp = startTimestamp + queryTimeSpan;
      }
      buf.append(deleteDelimiter).append(
        timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp);
    } else {
      buf.append(deleteDelimiter).append(timestampKey + tagPairDelimiter + startTimestamp);
    }

    db.delete(table, buf.toString());
  }

  /**
   * Parses the values returned by a read or scan operation and determines whether or not the
   * integer value matches the hash and timestamp of the original timestamp. Only works for raw data
   * points, will not work for group-by's or downsampled data.
   * @param key The time series key.
   * @param cells The cells read by the DB.
   * @return {@link Status#OK} if the data matched or {@link Status#UNEXPECTED_STATE} if the data
   *         did not match.
   */
  protected Status verifyRow(final String key, final Map<String, ByteIterator> cells) {
    Status verifyStatus = Status.UNEXPECTED_STATE;
    long startTime = System.nanoTime();

    double value = 0;
    long timestamp = 0;
    final TreeMap<String, String> validationTags = new TreeMap<String, String>();
    for (final Entry<String, ByteIterator> entry : cells.entrySet()) {
      if (entry.getKey().equals(timestampKey)) {
        final NumericByteIterator it = (NumericByteIterator) entry.getValue();
        timestamp = it.getLong();
      } else if (entry.getKey().equals(valueKey)) {
        final NumericByteIterator it = (NumericByteIterator) entry.getValue();
        value = it.isFloatingPoint() ? it.getDouble() : it.getLong();
      } else {
        validationTags.put(entry.getKey(), entry.getValue().toString());
      }
    }

    if (validationFunction(key, timestamp, validationTags) == value) {
      verifyStatus = Status.OK;
    }
    long endTime = System.nanoTime();
    measurements.measure("VERIFY", (int) (endTime - startTime) / 1000);
    measurements.reportStatus("VERIFY", verifyStatus);
    return verifyStatus;
  }

  /**
   * Function used for generating a deterministic hash based on the combination of metric, tags and
   * timestamp.
   * @param key A non-null string representing the key.
   * @param timestamp A timestamp in the proper units for the workload.
   * @param tags A non-null map of tag keys and values NOT including the YCSB key or timestamp.
   * @return A hash value as an 8 byte integer.
   */
  protected long validationFunction(final String key, final long timestamp,
      final TreeMap<String, String> tags) {
    final StringBuilder validationBuffer = new StringBuilder(
        keys[0].length() + (tagPairs * tagKeys[0].length()) + (tagPairs * tagCardinality[1]));
    for (final Entry<String, String> pair : tags.entrySet()) {
      validationBuffer.append(pair.getKey()).append(pair.getValue());
    }
    return (long) validationBuffer.toString().hashCode() ^ timestamp;
  }

  /**
   * Breaks out the keys, tags and cardinality initialization in another method to keep CheckStyle
   * happy.
   * @throws WorkloadException If something goes pear shaped.
   */
  protected void initKeysAndTags() throws WorkloadException {
    final int keyLength =
        Integer.parseInt(properties.getProperty(CoreWorkload.FIELD_LENGTH_PROPERTY,
          CoreWorkload.FIELD_LENGTH_PROPERTY_DEFAULT));
    final int tagKeyLength = Integer
        .parseInt(properties.getProperty(TAG_KEY_LENGTH_PROPERTY, TAG_KEY_LENGTH_PROPERTY_DEFAULT));
    final int tagValueLength = Integer.parseInt(
      properties.getProperty(TAG_VALUE_LENGTH_PROPERTY, TAG_VALUE_LENGTH_PROPERTY_DEFAULT));

    keyGenerator = new IncrementingPrintableStringGenerator(keyLength);
    tagKeyGenerator = new IncrementingPrintableStringGenerator(tagKeyLength);
    tagValueGenerator = new IncrementingPrintableStringGenerator(tagValueLength);

    final int threads = Integer.parseInt(properties.getProperty(Client.THREAD_COUNT_PROPERTY, "1"));
    final String tagCardinalityString =
        properties.getProperty(TAG_CARDINALITY_PROPERTY, TAG_CARDINALITY_PROPERTY_DEFAULT);
    final String[] tagCardinalityParts = tagCardinalityString.split(",");
    int idx = 0;
    totalCardinality = numKeys;
    perKeyCardinality = 1;
    int maxCardinality = 0;
    for (final String card : tagCardinalityParts) {
      try {
        tagCardinality[idx] = Integer.parseInt(card.trim());
      } catch (NumberFormatException nfe) {
        throw new WorkloadException("Unable to parse cardinality: " + card, nfe);
      }
      if (tagCardinality[idx] < 1) {
        throw new WorkloadException(
            "Cardinality must be greater than zero: " + tagCardinality[idx]);
      }
      totalCardinality *= tagCardinality[idx];
      perKeyCardinality *= tagCardinality[idx];
      if (tagCardinality[idx] > maxCardinality) {
        maxCardinality = tagCardinality[idx];
      }
      ++idx;
      if (idx >= tagPairs) {
        // we have more cardinalities than tag keys so bail at this point.
        break;
      }
    }
    if (numKeys < threads) {
      throw new WorkloadException("Field count " + numKeys + " (keys for time "
          + "series workloads) must be greater or equal to the number of " + "threads " + threads);
    }

    // fill tags without explicit cardinality with 1
    if (idx < tagPairs) {
      tagCardinality[idx++] = 1;
    }

    for (int i = 0; i < tagCardinality.length; ++i) {
      if (tagCardinality[i] > 1) {
        firstIncrementableCardinality = i;
        break;
      }
    }

    keys = new String[numKeys];
    tagKeys = new String[tagPairs];
    tagValues = new String[maxCardinality];
    for (int i = 0; i < numKeys; ++i) {
      keys[i] = keyGenerator.nextString();
    }

    for (int i = 0; i < tagPairs; ++i) {
      tagKeys[i] = tagKeyGenerator.nextString();
    }

    for (int i = 0; i < maxCardinality; i++) {
      tagValues[i] = tagValueGenerator.nextString();
    }
    if (randomizeTimeseriesOrder) {
      Utils.shuffleArray(keys);
      Utils.shuffleArray(tagValues);
    }

    maxOffsets = (recordcount / totalCardinality) + 1;
    final int[] keyAndTagCardinality = new int[tagPairs + 1];
    keyAndTagCardinality[0] = numKeys;
    for (int i = 0; i < tagPairs; i++) {
      keyAndTagCardinality[i + 1] = tagCardinality[i];
    }

    cumulativeCardinality = new int[keyAndTagCardinality.length];
    for (int i = 0; i < keyAndTagCardinality.length; i++) {
      int cumulation = 1;
      for (int x = i; x <= keyAndTagCardinality.length - 1; x++) {
        cumulation *= keyAndTagCardinality[x];
      }
      if (i > 0) {
        cumulativeCardinality[i - 1] = cumulation;
      }
    }
    cumulativeCardinality[cumulativeCardinality.length - 1] = 1;
  }

  /**
   * Makes sure the settings as given are compatible.
   * @throws WorkloadException If one or more settings were invalid.
   */
  protected void validateSettings() throws WorkloadException {
    if (dataintegrity) {
      if (valueType != ValueType.INTEGERS) {
        throw new WorkloadException(
            "Data integrity was enabled. 'valuetype' must " + "be set to 'integers'.");
      }
      if (groupBy) {
        throw new WorkloadException(
            "Data integrity was enabled. 'groupbyfunction' must " + "be empty or null.");
      }
      if (downsample) {
        throw new WorkloadException(
            "Data integrity was enabled. 'downsamplingfunction' must " + "be empty or null.");
      }
      if (queryTimeSpan > 0) {
        throw new WorkloadException(
            "Data integrity was enabled. 'querytimespan' must " + "be empty or 0.");
      }
      if (randomizeTimeseriesOrder) {
        throw new WorkloadException(
            "Data integrity was enabled. 'randomizetimeseriesorder' must " + "be false.");
      }
      final String startTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY);
      if (startTimestamp == null || startTimestamp.isEmpty()) {
        throw new WorkloadException("Data integrity was enabled. 'insertstart' must "
            + "be set to a Unix Epoch timestamp.");
      }
    }
  }

  /**
   * Thread state class holding thread local generators and indices.
   */
  protected class ThreadState {
    /** The timestamp generator for this thread. */
    protected final UnixEpochTimestampGenerator timestampGenerator;

    /** An offset generator to select a random offset for queries. */
    protected final NumberGenerator queryOffsetGenerator;

    /** The current write key index. */
    protected int keyIdx;

    /** The starting fence for writing keys. */
    protected int keyIdxStart;

    /** The ending fence for writing keys. */
    protected int keyIdxEnd;

    /** Indices for each tag value for writes. */
    protected int[] tagValueIdxs;

    /** Whether or not all time series have written values for the current timestamp. */
    protected boolean rollover;

    /** The starting timestamp. */
    protected long startTimestamp;

    /**
     * Default ctor.
     * @param threadID The zero based thread ID.
     * @param threadCount The total number of threads.
     * @throws WorkloadException If something went pear shaped.
     */
    protected ThreadState(final int threadID, final int threadCount) throws WorkloadException {
      int totalThreads = threadCount > 0 ? threadCount : 1;

      if (threadID >= totalThreads) {
        throw new IllegalStateException("Thread ID " + threadID + " cannot be greater "
            + "than or equal than the thread count " + totalThreads);
      }
      if (keys.length < threadCount) {
        throw new WorkloadException("Thread count " + totalThreads + " must be greater "
            + "than or equal to key count " + keys.length);
      }

      int keysPerThread = keys.length / totalThreads;
      keyIdx = keysPerThread * threadID;
      keyIdxStart = keyIdx;
      if (totalThreads - 1 == threadID) {
        keyIdxEnd = keys.length;
      } else {
        keyIdxEnd = keyIdxStart + keysPerThread;
      }

      tagValueIdxs = new int[tagPairs]; // all zeros

      final String startingTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY);
      if (startingTimestamp == null || startingTimestamp.isEmpty()) {
        timestampGenerator = randomizeTimestampOrder
            ? new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, maxOffsets)
            : new UnixEpochTimestampGenerator(timestampInterval, timeUnits);
      } else {
        try {
          timestampGenerator = randomizeTimestampOrder
              ? new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits,
                  Long.parseLong(startingTimestamp), maxOffsets)
              : new UnixEpochTimestampGenerator(timestampInterval, timeUnits,
                  Long.parseLong(startingTimestamp));
        } catch (NumberFormatException nfe) {
          throw new WorkloadException("Unable to parse the " + CoreWorkload.INSERT_START_PROPERTY,
              nfe);
        }
      }
      // Set the last value properly for the timestamp, otherwise it may start
      // one interval ago.
      startTimestamp = timestampGenerator.nextValue();
      // TODO - pick it
      queryOffsetGenerator = new UniformLongGenerator(0, maxOffsets - 2);
    }

    /**
     * Generates the next write value for thread.
     * @param map An initialized map to populate with tag keys and values as well as the timestamp
     *          and actual value.
     * @param isInsert Whether or not it's an insert or an update. Updates will pick an older
     *          timestamp (if random isn't enabled).
     * @return The next key to write.
     */
    protected String nextDataPoint(final Map<String, ByteIterator> map, final boolean isInsert) {
      final Random random = ThreadLocalRandom.current();
      int iterations =
          sparsity <= 0 ? 1 : random.nextInt((int) ((double) perKeyCardinality * sparsity));
      if (iterations < 1) {
        iterations = 1;
      }
      while (true) {
        iterations--;
        if (rollover) {
          timestampGenerator.nextValue();
          rollover = false;
        }
        String key = null;
        if (iterations <= 0) {
          final TreeMap<String, String> validationTags;
          if (dataintegrity) {
            validationTags = new TreeMap<String, String>();
          } else {
            validationTags = null;
          }
          key = keys[keyIdx];
          int overallIdx = keyIdx * cumulativeCardinality[0];
          for (int i = 0; i < tagPairs; ++i) {
            int tvidx = tagValueIdxs[i];
            map.put(tagKeys[i], new StringByteIterator(tagValues[tvidx]));
            if (dataintegrity) {
              validationTags.put(tagKeys[i], tagValues[tvidx]);
            }
            if (delayedSeries > 0) {
              overallIdx += (tvidx * cumulativeCardinality[i + 1]);
            }
          }

          if (!isInsert) {
            final long delta =
                (timestampGenerator.currentValue() - startTimestamp) / timestampInterval;
            final int intervals = random.nextInt((int) delta);
            map.put(timestampKey,
              new NumericByteIterator(startTimestamp + (intervals * timestampInterval)));
          } else if (delayedSeries > 0) {
            // See if the series falls in a delay bucket and calculate an offset earlier
            // than the current timestamp value if so.
            double pct = (double) overallIdx / (double) totalCardinality;
            if (pct < delayedSeries) {
              int modulo = overallIdx % delayedIntervals;
              if (modulo < 0) {
                modulo *= -1;
              }
              map.put(timestampKey, new NumericByteIterator(
                  timestampGenerator.currentValue() - timestampInterval * modulo));
            } else {
              map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue()));
            }
          } else {
            map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue()));
          }

          if (dataintegrity) {
            map.put(valueKey, new NumericByteIterator(
                validationFunction(key, timestampGenerator.currentValue(), validationTags)));
          } else {
            switch (valueType) {
              case INTEGERS:
                map.put(valueKey, new NumericByteIterator(random.nextInt()));
                break;
              case FLOATS:
                map.put(valueKey, new NumericByteIterator(random.nextDouble() * (double) 100000));
                break;
              case MIXED:
                if (random.nextBoolean()) {
                  map.put(valueKey, new NumericByteIterator(random.nextInt()));
                } else {
                  map.put(valueKey, new NumericByteIterator(random.nextDouble() * (double) 100000));
                }
                break;
              default:
                throw new IllegalStateException("Somehow we didn't have a value "
                    + "type configured that we support: " + valueType);
            }
          }
        }

        boolean tagRollover = false;
        for (int i = tagCardinality.length - 1; i >= 0; --i) {
          if (tagCardinality[i] <= 1) {
            tagRollover = true; // Only one tag so needs roll over.
            continue;
          }

          if (tagValueIdxs[i] + 1 >= tagCardinality[i]) {
            tagValueIdxs[i] = 0;
            if (i == firstIncrementableCardinality) {
              tagRollover = true;
            }
          } else {
            ++tagValueIdxs[i];
            break;
          }
        }

        if (tagRollover) {
          if (keyIdx + 1 >= keyIdxEnd) {
            keyIdx = keyIdxStart;
            rollover = true;
          } else {
            ++keyIdx;
          }
        }

        if (iterations <= 0) {
          return key;
        }
      }
    }
  }

}